package cn.spark.study.core

import org.apache.spark.{SparkConf, SparkContext}

object TransformationOperation {
  def main(args: Array[String]): Unit = {
    // map()
    // filter()
    // flatMap()
    // groupByKey()
    // reduceByKey()
    // sortByKey()
    // join()
    cogroup()
  }

  def map(): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(1, 2, 3, 4, 5)
    val numbersRDD = sc.parallelize(numbers, 1)
    val resultRDD = numbersRDD.map(number => number * 2)
    resultRDD.foreach(r => println(r))
  }

  def filter(): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val evenNumberRDD = sc.parallelize(numbers, 1)
      .filter(num => num %2 == 0)
      .foreach(num => println(num))
  }

  def flatMap(): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local")
    val sc = new SparkContext(conf)
    val lineArray = Array("hello me", "hello you", "hello word")
    val lines = sc.parallelize(lineArray)
    val words = lines.flatMap {line => line.split(" ")}
    words.foreach {word => println(word)}
  }

  def groupByKey(): Unit = {
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
    val sc = new SparkContext(conf)
//    val scoreArray = Array(("class1", 80), ("class2", 75), ("class1", 90), ("class2", 60))
    val scoreArray = Array(Tuple2("class1", 80), Tuple2("class2", 75), Tuple2("class1", 90), Tuple2("class2", 65))
    val scores = sc.parallelize(scoreArray)
    val groupedScores = scores.groupByKey()
    groupedScores.foreach(score => {
      println("班级：" + score._1)
      score._2.foreach {singleScore => println(singleScore)}
      println("==================================")
    })
  }

  def reduceByKey(): Unit = {
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val scoreArray = Array(Tuple2("class1", 80), Tuple2("class2", 75), Tuple2("class1", 90), Tuple2("class2", 65))
    val scores = sc.parallelize(scoreArray)
    val totalScores = scores.reduceByKey {_ + _}
    totalScores.foreach(score => {
      println(score._1 + "：" + score._2)
    })
  }

  def sortByKey(): Unit = {
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val scoreArray = Array((65, "leo"), (50, "tom"), (100, "marry"), (85, "jack"))
    val scores = sc.parallelize(scoreArray)
    val sortedScore = scores.sortByKey(false)
    sortedScore.foreach(score => println(score._2 + ":" + score._1))
  }

  def join(): Unit = {
    val conf = new SparkConf().setAppName("join").setMaster("local")
    val sc = new SparkContext(conf)
    val studentArray = Array((1, "leo"), (2, "marry"), (3, "jack"))
    val scoreArray = Array((1, 85), (2, 100), (3, 60))
    val students = sc.parallelize(studentArray)
    val scores = sc.parallelize(scoreArray)
    val studentScores = students.join(scores)
    studentScores.foreach(studentScore => {
      println("编号：" + studentScore._1)
      println("姓名：" + studentScore._2._1)
      println("成绩：" + studentScore._2._2)
      println("===========================")
    })
  }

  def cogroup(): Unit = {
    val conf = new SparkConf().setAppName("cogroup").setMaster("local")
    val sc = new SparkContext(conf)
    val studentArray = Array((1, "leo"), (2, "marry"), (3, "jack"))
    val scoreArray = Array((1, 85), (2, 100), (3, 60), (1, 75), (2, 90), (3, 50))
    val students = sc.parallelize(studentArray)
    val scores = sc.parallelize(scoreArray)
    val studentScores = students.cogroup(scores)
    studentScores.foreach(studentScore => {
      println("编号：" + studentScore._1)
      println("姓名：" + studentScore._2._1)
      println("成绩：" + studentScore._2._2)
      println("===========================")
    })
  }
}
